package com.codejiwei.core.demo

import org.apache.spark.sql.SparkSession

object MySqlCdcSync {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder.master("local[2]").appName("CDCExample").getOrCreate()

    // 使用Debezium捕获CDC事件
    val cdcDF = spark.read
      .format("debezium")
      .option("connector.class", "io.debezium.connector.mysql.MySqlConnector")
      .option("database.hostname", "localhost")
      .option("database.port", "3306")
      .option("database.user", "root")
      .option("database.password", "123456")
      .option("database.server.id", "1")
      .option("database.server.name", "cdc_example")
      .option("table.whitelist", "mydb.flink_score")
      .load()

    // 写入目标数据
    cdcDF.write.mode("append")
      .format("jdbc")
      .option("url", "jdbc:mysql://localhost/mydb")
      .option("dbtable", "flink_score_bak")
      .option("user", "root")
      .option("password", "123456")
      .save()

    spark.stop()



  }

}
